import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{SQLContext, SparkSession}

object CleanData {

  /**
   * ID, Case Number, Date, Block, IUCR, Primary Type, Description, Location Description, Arrest, Domestic, Beat, District, Ward, Community Area,
   * FBI Code, X Coordinate, Y Coordinate, Year, Updated On, Latitude, Longitude, Location.
   * @param args
   */
  def main(args: Array[String]): Unit = {

    val conf = new SparkConf().setAppName("cleanData").setMaster("local[8]")
    //https://www.jianshu.com/p/9b243c0a7410 Spark参数介绍
    conf.set("spark.executor.memory","6G").set("spark.driver.memory","4G").set("spark.executor.cores","1").set("spark.yarn.am.waitTime","1000s")
    //setMaster主要是连接主节点，如果参数是"local"，则在本地用单线程运行spark，如果是 local[4]，则在本地用4核运行
    //设置executor执行时所需的memory
    //配置每个Executor的CPU core数量

    val sc = new SparkContext(conf)
    //SparkSession内部封装了sparkContext，所以计算实际上是由sparkContext完成的

    val sqlContext = new SQLContext(sc)

    val spark = SparkSession.builder().config(conf).getOrCreate()
    import spark.implicits._



    val dataFrame = sqlContext.read.format("com.databricks.spark.csv").option("header","true")
      .option("inferSchema", true.toString).option("encoding","gbk").load("D:\\Dataset\\Crimes_-_2001_to_20211126.csv")
    //过滤出字段为空的数据
    val df2 = dataFrame.filter(x=>{
      val ID =  x.get(0)
      val CaseNumber = x.get(1)
      val Date = x.get(2)
      val Block = x.get(3)
      val IUCR = x.get(4)
      val PrimaryType = x.get(5)
      val Description = x.get(6)
      val LocationDescription = x.get(7)
      val  Arrest = x.get(8)
      val Domestic = x.get(9)
      val Beat = x.get(10)
      val District = x.get(11)
      val Ward = x.get(12)
      val CommunityArea = x.get(13)
      val FBICode = x.get(14)
      val XCoordinate = x.get(15)
      val YCoordinate = x.get(16)
      val Year = x.get(17)
      val UpdatedOn = x.get(18)
      val Latitude = x.get(19)
      val Longitude = x.get(20)
      val Location = x.get(21)
      ID != null && CaseNumber!=null && Date != null && Block != null && IUCR != null && PrimaryType != null && Description != null && LocationDescription != null &&
       Arrest != null && Domestic != null && Beat != null && District != null && Ward != null && CommunityArea != null && FBICode != null && XCoordinate != null &&
        YCoordinate != null && Year != null && UpdatedOn != null && Latitude != null && Longitude != null && Location != null
    })

    //去重
    val df3 = df2.distinct()

    //去掉X Coordinate, Y Coordinate, Latitude Longitude 这个没有用的字段
    val df4 = df3.map(x=>{

      val ID =  x.get(0)+""
      val CaseNumber = x.get(1)+""
      val Date = x.get(2)+""
      val Block = x.get(3)+""
      val IUCR = x.get(4)+""
      val PrimaryType = x.get(5)+""
      val Description = x.get(6)+""
      val LocationDescription = x.get(7)+""
      val  Arrest = x.getAs[Boolean](8)
      val Domestic = x.getAs[Boolean](9)
      val Beat = (x.get(10)+"").toInt
      val District = (x.get(11)+"").toInt
      val Ward = (x.get(12)+"").toInt
      val CommunityArea = (x.get(13)+"").toInt
      val FBICode = x.get(14)+""

      val Year = x.get(17)+""
      val UpdatedOn = x.get(18)+""
      val Location = x.get(21)+""

      (ID,CaseNumber,Date,Block,IUCR,PrimaryType,Description,LocationDescription,Arrest,Domestic,Beat,District,Ward,CommunityArea,FBICode,Year,UpdatedOn,Location)

    })


    println("begin  collect....................")


    val count = df4.count()
    println("rows size......." + count)


  }

}
